Skip to content

Web UI not working when multiprocessing queue has > 320 in size #3137

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
2 tasks done
aulia-adil opened this issue May 13, 2025 · 4 comments
Open
2 tasks done

Web UI not working when multiprocessing queue has > 320 in size #3137

aulia-adil opened this issue May 13, 2025 · 4 comments
Labels

Comments

@aulia-adil
Copy link

aulia-adil commented May 13, 2025

Prerequisites

Description

I tried to do load testing with Locust for online incremental machine learning in large scale system. Therefore, the input for the system can't be random and I have prepared a dataset to be the input. The dataset should be loaded before the load testing begins. I put the dataset at the start of the file so it will be a global variable. And I use queue because it is easy to use one list for many concurrent virtual user.

So, I do this to my code:

from multiprocessing import Queue
import pandas as pd

dataset_dir = "/root/Datasets/csv/"
file_name = "AGR_a_testing.csv"

file_path = dataset_dir + file_name
df = pd.read_csv(file_path, sep=',', header=0)
df_queue = Queue() 
i = 0
for record in df.to_dict('records'):
    df_queue.put(record)
    i += 1
    if i == 320:
        break

class MLPredictionUser(HttpUser):

    @task
    def make_prediction_request(self):
          while not df_queue.empty():
             try:
                 X = df_queue.get()
                  # and so on...

What makes it interesting is that, if I put it if i == 320 then break, then the web UI will work fine

Image

But if I put if i == 321 then break, then the web UI won't work

Image

Please help me solve this problem.

Sincerely,

Adil

Command line

locust -f locust-test.py --web-host=0.0.0.0 --web-port=8089

Locustfile contents

from multiprocessing import Queue
import pandas as pd

dataset_dir = "/root/Datasets/csv/"
file_name = "AGR_a_testing.csv"

file_path = dataset_dir + file_name
df = pd.read_csv(file_path, sep=',', header=0)
df_queue = Queue() 
i = 0
for record in df.to_dict('records'):
    df_queue.put(record)
    i += 1
    if i == 320:
        break

class MLPredictionUser(HttpUser):

    @task
    def make_prediction_request(self):
          while not df_queue.empty():
             try:
                 X = df_queue.get()
                  # and so on...

Python version

3.12.3

Locust version

locust==2.37.0 locust-cloud==1.21.2

Operating system

Linux Ubuntu WSL 24.04

@aulia-adil aulia-adil added the bug label May 13, 2025
@aulia-adil
Copy link
Author

aulia-adil commented May 13, 2025

So, I changed the line df_queue = Queue() into

import multiprocessing

manager = multiprocessing.Manager()
df_queue = manager.Queue()

And this error happens now

Traceback (most recent call last):
  File "/root/.venv/bin/locust", line 8, in <module>
    sys.exit(main())
             ^^^^^^
  File "/root/.venv/lib/python3.12/site-packages/locust/main.py", line 167, in main
    ) = merge_locustfiles_content(locustfiles)
        ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/root/.venv/lib/python3.12/site-packages/locust/main.py", line 123, in merge_locustfiles_content
    user_classes, shape_classes = load_locustfile(_locustfile)
                                  ^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/root/.venv/lib/python3.12/site-packages/locust/util/load_locustfile.py", line 70, in load_locustfile
    loader.exec_module(imported)
  File "<frozen importlib._bootstrap_external>", line 995, in exec_module
  File "<frozen importlib._bootstrap>", line 488, in _call_with_frames_removed
  File "/root/MLOps-Architecture/Serialization_Datasets/locust-test.py", line 34, in <module>
    df_queue = manager.Queue()
               ^^^^^^^^^^^^^^^
  File "/usr/lib/python3.12/multiprocessing/managers.py", line 726, in temp
    token, exp = self._create(typeid, *args, **kwds)
                 ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/usr/lib/python3.12/multiprocessing/managers.py", line 606, in _create
    conn = self._Client(self._address, authkey=self._authkey)
           ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/usr/lib/python3.12/multiprocessing/connection.py", line 525, in Client
    answer_challenge(c, authkey)
...
  File "/usr/lib/python3.12/multiprocessing/connection.py", line 395, in _recv
    chunk = read(handle, remaining)
            ^^^^^^^^^^^^^^^^^^^^^^^
BlockingIOError: [Errno 11] Resource temporarily unavailable

Can you please help me solve this problem?

@cyberw
Copy link
Collaborator

cyberw commented May 13, 2025

Hi! These are both interesting issues, but I think they are more related to your specific use case in combination with gevent and not so much to Locust specifically. I don't have much experience with pandas.

The first issue you're having might be Locust related. Can you try moving the initialization code from module level into the init event?

@cgoldberg
Copy link
Member

You probably want to use a gevent.queue.Queue rather than a multiprocessing.Queue

https://www.gevent.org/api/gevent.queue.html

@aulia-adil
Copy link
Author

Thank you so much @cyberw and @cgoldberg for the suggestion. Though, after I asked Claude 3.7 Sonnet, I found a workaround for my problem. Simply by using the threading.RLock(). Yes, in my case, it isn't a distributed load testing, just a simple load testing from one machine. Though, Claude said that multiprocessing.Manager().Queue() is more suitable for distributed load testing. So far, I have found the solution. Please just close this issue if you have to.

Sincerely,
Adil

My workaround solution

class SequentialDatasetReader:
    """
    Thread-safe sequential dataset reader for concurrent virtual users.
    Ensures each record is processed exactly once in original sequence.
    """
    def __init__(self, dataset_path, cycle=True):
        """
        Initialize the dataset reader.
        
        Args:
            dataset_path: Path to the CSV dataset file
            cycle: If True, restart from beginning when dataset is exhausted
        """
        self.dataset_path = dataset_path
        self.cycle = cycle
        self.lock = threading.RLock()  # Reentrant lock for thread safety
        self.data = []
        self.current_index = 0
        self._load_data()
    
    def _load_data(self):
        """Load all dataset records into memory."""
        with open(self.dataset_path, 'r') as file:
            reader = csv.DictReader(file)
            self.data = list(reader)
            
        if not self.data:
            raise ValueError("Dataset is empty")
    
    def get_next_record(self):
        """
        Thread-safely get the next record from the dataset.
        
        Returns:
            The next record as a dictionary, or None if dataset is exhausted and cycle=False
        """
        with self.lock:
            # Check if we've reached the end
            if self.current_index >= len(self.data):
                if not self.cycle:
                    return None
                # Reset to beginning if cycling
                self.current_index = 0
            
            # Get the next record and increment the counter
            record = self.data[self.current_index]
            self.current_index += 1
            return record

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
Projects
None yet
Development

No branches or pull requests

3 participants